 1.Spark Core之RDD编程中Transformation【重要】
   
   RDD的操作算子分为两类：
      Transformation。用来对RDD进行转化，这个操作时延迟执行的(或者说是Lazy 的)；
      Action。用来触发RDD的计算；得到相关计算结果 或者 将结果保存的外部系统中；
	  Transformation：返回一个新的RDD
      Action：返回结果int、double、集合（不会返回新的RDD）
      要很准确区分Transformation、Action
   
   每一次 Transformation 操作都会产生新的RDD，供给下一个“转换”使用；
   转换得到的RDD是惰性求值的。也就是说，整个转换过程只是记录了转换的轨迹，并不
会发生真正的计算，只有遇到 Action 操作时，才会发生真正的计算，开始从血缘关系
(lineage)源头开始，进行物理的转换操作；
   
   常见的 Transformation 算子：
   官方文档：
   http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
   
 2.Transformation常见转换算子1
   
   map(func)：对数据集中的每个元素都使用func，然后返回一个新的RDD
   filter(func)：对数据集中的每个元素都使用func，然后返回一个包含使func为true的元
素构成的RDD
   flatMap(func)：与 map 类似，每个输入元素被映射为0或多个输出元素
   mapPartitions(func)：和map很像，但是map是将func作用在每个元素上，而mapPartitions
是func作用在整个分区上。假设一个RDD有N个元素，M个分区(N>> M)，那么map的函数将被调用
N次，而mapPartitions中的函数仅被调用M次，一次处理一个分区中的所有元素
   mapPartitionsWithIndex(func)：与 mapPartitions 类似，多了分区索引值信息
   全部都是窄依赖

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = rdd1.map(_*2)
val rdd3 = rdd2.filter(_>10)

// 以上都是 Transformation 操作，没有被执行。如何证明这些操作按预期执
行，此时需要引入Action算子
rdd2.collect
rdd3.collect

// collect 是Action算子，触发Job的执行，将RDD的全部元素从 Executor
// 搜集到 Driver 端。生产环境中禁用

// flatMap 使用案例
val rdd4 = sc.textFile("wcinput/wc.txt")
rdd4.collect
rdd4.flatMap(_.split("\\s+")).collect

// RDD 是分区，rdd1有几个区，每个分区有哪些元素
rdd1.getNumPartitions
rdd1.partitions.length

rdd1.mapPartitions{iter =>
  Iterator(s"${iter.toList}")
}.collect

rdd1.mapPartitions{iter =>
  Iterator(s"${iter.toArray.mkString("-")}")
}.collect

rdd1.mapPartitionsWithIndex{(idx, iter) =>
  Iterator(s"$idx:${iter.toArray.mkString("-")}")
}.collect

// 每个元素 * 2
val rdd5 = rdd1.mapPartitions(iter => iter.map(_*2))
rdd5.collect
    
	map 与 mapPartitions 的区别
	   map：每次处理一条数据
       mapPartitions：每次处理一个分区的数据，分区的数据处理完成后，数据才能
释放，资源不足时容易导致OOM
      最佳实践：当内存资源充足时，建议使用mapPartitions，以提高处理效率
	  
 3.Transformation常见转换算子2
   
   groupBy(func)：按照传入函数的返回值进行分组。将key相同的值放入一个迭代器
   glom()：将每一个分区形成一个数组，形成新的RDD类型 RDD[Array[T]]
   sample(withReplacement, fraction, seed)：采样算子。以指定的随机种子(seed)
随机抽样出数量为fraction的数据，withReplacement表示是抽出的数据是否放回，true
为有放回的抽样，false为无放回的抽样
   
   distinct([numTasks]))：对RDD元素去重后，返回一个新的RDD。可传入numTasks参数
改变RDD分区数
   coalesce(numPartitions)：缩减分区数，无shuffle
   repartition(numPartitions)：增加或减少分区数，有shuffle
   sortBy(func, [ascending], [numTasks])：使用 func 对数据进行处理，对处理后的
结果进行排序
   宽依赖的算子（shuffle）：groupBy、distinct、repartition、sortBy
   
// 将 RDD 中的元素按照3的余数分组
val rdd = sc.parallelize(1 to 10)
val group = rdd.groupBy(_%3)
group.collect

// 将 RDD 中的元素每10个元素分组
val rdd = sc.parallelize(1 to 101)
val rdd3 = rdd.glom.map(_.sliding(10, 10).toArray)
rdd3.collect
// sliding是Scala中的方法

// 对数据采样。fraction采样的百分比，近似数
// 有放回的采样，使用固定的种子
rdd.sample(true, 0.2, 2).collect
// 无放回的采样，使用固定的种子
rdd.sample(false, 0.2, 2).collect
// 有放回的采样，不设置种子
rdd.sample(false, 0.2).collect

// 数据去重
val random = scala.util.Random
val arr = (1 to 20).map(x => random.nextInt(10))
val rdd = sc.makeRDD(arr)
rdd.distinct.collect

// RDD重分区
val rdd1 = sc.range(1, 10000, numSlices=10)
val rdd2 = rdd1.filter(_%2==0)
rdd2.getNumPartitions

// 减少分区数；都生效了
val rdd3 = rdd2.repartition(5)
rdd3.getNumPartitions
val rdd4 = rdd2.coalesce(5)
rdd4.getNumPartitions

// 增加分区数
val rdd5 = rdd2.repartition(20)
rdd5.getNumPartitions

// 增加分区数，这样使用没有效果
val rdd6 = rdd2.coalesce(20)
rdd6.getNumPartitions

// 增加分区数的正确用法
val rdd6 = rdd2.coalesce(20, true)
rdd6.getNumPartitions

// RDD元素排序
val random = scala.util.Random
val arr = (1 to 20).map(x => random.nextInt(10))
val rdd = sc.makeRDD(arr)
rdd.collect

// 数据全局有序，默认升序
rdd.sortBy(x=>x).collect
// 降序
rdd.sortBy(x=>x,false).collect
    
	coalesce 与 repartition 的区别
	小结：
         repartition：增大或减少分区数；有shuffle
         coalesce：一般用于减少分区数(此时无shuffle)

 4.Transformation常见转换算子3
   
   RDD之间的交、并、差算子，分别如下：
      intersection(otherRDD)
      union(otherRDD)
      subtract (otherRDD)
   cartesian(otherRDD)：笛卡尔积
   zip(otherRDD)：将两个RDD组合成 key-value 形式的RDD，默认两个RDD的partition
数量以及元素数量都相同，否则会抛出异常。
   
   宽依赖的算子（shuffle）：intersection、subtract

val rdd1 = sc.range(1, 21)
val rdd2 = sc.range(10, 31)

rdd1.intersection(rdd2).sortBy(x=>x).collect
// 元素求并集，不去重
rdd1.union(rdd2).sortBy(x=>x).collect
rdd1.subtract(rdd2).sortBy(x=>x).collect

// 检查分区数
rdd1.intersection(rdd2).getNumPartitions
rdd1.union(rdd2).getNumPartitions
rdd1.subtract(rdd2).getNumPartitions

// 笛卡尔积
val rdd1 = sc.range(1, 5)
val rdd2 = sc.range(6, 10)
rdd1.cartesian(rdd2).collect
// 检查分区数
rdd1.cartesian(rdd2).getNumPartitions

// 拉链操作
rdd1.zip(rdd2).collect
rdd1.zip(rdd2).getNumPartitions

// zip操作要求：两个RDD的partition数量以及元素数量都相同，否则会抛出异常
val rdd2 = sc.range(6, 20)
rdd1.zip(rdd2).collect
   
   备注：
        union是窄依赖。得到的RDD分区数为：两个RDD分区数之和
        cartesian是窄依赖
		   得到RDD的元素个数为：两个RDD元素个数的乘积
           得到RDD的分区数为：两个RDD分区数的乘积
           使用该操作会导致数据膨胀，慎用